#!/usr/bin/env perl

use warnings;
use strict;
use feature 'state';

use English;
use FindBin;
use lib "$FindBin::Bin/../../lib";

use Getopt::Long;
use DBI;
use DBDefs;
use JSON::XS;
use List::AllUtils qw( zip );
use MusicBrainz::Script::Utils qw( get_primary_keys );
use MusicBrainz::Server::Context;
use Sql;
use Types::Serialiser;

my $show_long_sql = 0;
my $show_short_sql = 0;
my $ignore_conflicts = 0;
my $dbmirror2 = 1;
my $debug_xact = 0;
my $skip_seqid;
my $limit = 10000;
my $database = 'READWRITE';

my $help = <<EOF;
Usage: ProcessReplicationChanges [OPTIONS]

Options are:
        --short-sql         Show a summary of each "write" statement executed
        --long-sql          Show the full text of each "write" statement executed
        --ignore-conflicts  Ignore any sequences that conflict with existing rows
        --debug-xact        Show when each transaction starts / ends
    -s, --skip-seqid=N      Ignore SeqId's up to and including SeqId N
        --limit=N           Update, insert, or delete at most N rows at a time
                            This option is useful if you have a system with a small
                            amount of memory - a setting of 10,000 works well for
                            machines with 64MB RAM. The default is $limit.
        --database          Database to connect to (default: READWRITE)
    -h, --help              Show this help

EOF

GetOptions(
    'short-sql'         => \$show_short_sql,
    'long-sql'          => \$show_long_sql,
    'ignore-conflicts'  => \$ignore_conflicts,
    'dbmirror2!'        => \$dbmirror2,
    'debug-xact'        => \$debug_xact,
    'skip-seqid|s=i'    => \$skip_seqid,
    'limit=i'           => \$limit,
    'database=s'        => \$database,
    'help|h'            => sub { print $help; exit },
) or exit 2;
print($help), exit 2 if @ARGV;

$OUTPUT_AUTOFLUSH = 1;
print localtime() . " : Processing replication changes\n";

$SIG{'INT'} = sub { die "SIGINT\n" };
my ($ins, $del, $upd) = (0, 0, 0);
my %bytable;

my $c = MusicBrainz::Server::Context->create_script_context(database => $database);
my $json = JSON::XS->new;
$json->boolean_values(0, 1)->pretty(0);

# Main connection wrapper used for looping over all of the ordered transaction (X) IDs,
# and also any other miscellaneous queries at the beginning/ending of the script.
my $sql = Sql->new($c->conn);

# Connection wrapper used only for looping, through a cursor, over each
# pending_data change for the transaction (X) ID obtained from the above.
my $fetching_change_sql = Sql->new($c->conn);

# Connection wrapper used only for applying all of the changes obtained from the
# above in a single but separate transaction for each transaction (X) ID.
my $applying_change_sql = Sql->new($c->conn);

$sql->auto_commit;
$sql->do('SET search_path = musicbrainz, public');

$sql->auto_commit;
$sql->do('SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL REPEATABLE READ');

#Obtain a list of pending transactions using ordering by our approximation
#to the commit time.  The commit time approximation is taken to be the
#SeqId of the last row edit in the transaction.
my $query = $dbmirror2
    ? 'SELECT xid, max(seqid) FROM dbmirror2.pending_data GROUP BY xid ORDER BY max(seqid)'
    : 'SELECT pd.XID,MAX(SeqId) FROM dbmirror_pending pd GROUP BY pd.XID ORDER BY MAX(pd.SeqId)';

my $total_statements = $sql->select_single_value(
    $dbmirror2
        ? 'SELECT count(*) FROM dbmirror2.pending_data'
        : 'SELECT COUNT(*) FROM dbmirror_pending',
);

# This loop loops through each pending transaction in the proper order.
# The Pending row edits for that transaction will be queried from the
# master and sent + committed to the mirrors.
if (my $totalrows = $sql->select($query))
{
    my @row;

    use Time::HiRes qw( gettimeofday tv_interval );
    my $t1 = [gettimeofday];
    my $interval;
    my $rows = 0;
    my $strows = 0;

    printf "%9s %9s %4s %9s %9s\n", 'XIDs', 'Stmts', 'est%', 'XIDs/sec', 'Stmt/sec';

    my $lasttime;
    my $p = sub {
        my ($pre, $post) = @_;
        no integer;
        printf $pre.'%9d %9d %3d%% %9d %9d'.$post,
                $rows, $strows,
                int(100 * $strows / $total_statements),
                $rows / ($interval||1),
                $strows / ($interval||1),
                ;
        $lasttime = time;
    };

    $p = sub { $lasttime = time }
        if $show_short_sql or $show_long_sql or $debug_xact;

        $p->('', '');

    while (@row = $sql->next_row)
    {
        my $XID = $row[0];
        my $maxSeqId = $row[1];
        my $seqId;

        if ($skip_seqid and $maxSeqId <= $skip_seqid)
        {
                print localtime() . " : Ignoring SeqId #$maxSeqId\n";
                next;
        }

        my $query = q{
                SELECT  pnd.SeqId, pnd.TableName, pnd.Op, pnddata.IsKey, pnddata.Data AS Data
                FROM    dbmirror_pending pnd, dbmirror_pendingdata pnddata
                WHERE   pnd.SeqId = pnddata.SeqId
                AND             pnd.XID = ?
                ORDER BY SeqId, IsKey DESC
        };

        if ($dbmirror2) {
            $query = <<~'SQL';
                   SELECT pd.seqid,
                          pd.tablename,
                          pd.op,
                          pd.olddata,
                          pd.newdata,
                          pk.keys
                     FROM dbmirror2.pending_data pd
                LEFT JOIN dbmirror2.pending_keys pk
                       ON pk.tablename = pd.tablename
                    WHERE pd.xid = ?
                 ORDER BY pd.seqid ASC
                SQL
        }

        $applying_change_sql->begin;
        $applying_change_sql->do('SET CONSTRAINTS ALL DEFERRED');

        $fetching_change_sql->do("DECLARE csr NO SCROLL CURSOR FOR $query", $XID);

        my @row2;
        my $pull = sub {
            $fetching_change_sql->finish;
            $fetching_change_sql->select("FETCH FORWARD $limit FROM csr");
        };
        while ($pull->())
        {
            while (@row2 = $fetching_change_sql->next_row)
            {
                # TODO: Figure out how to handle errors here
                if ($dbmirror2) {
                    if (!dbmirror2_command($applying_change_sql, \@row2)) {
                        die "Mirror command failed.\n";
                    }
                }
                elsif (!mirrorCommand($row2[2], $fetching_change_sql, $applying_change_sql, \@row2, $XID, $pull))
                {
                    die "Mirror command failed.\n";
                }
                ++$strows;
            }
        }
        $fetching_change_sql->finish;

        $fetching_change_sql->do('CLOSE csr');
        $applying_change_sql->do(
            $dbmirror2
                ? 'DELETE FROM dbmirror2.pending_data WHERE xid = ?'
                : 'DELETE FROM dbmirror_pending WHERE XID = ?',
            $XID,
        );

        print localtime() . " : COMMIT XID #$XID\n" if $debug_xact;
        $applying_change_sql->commit;

        ++$rows;
        unless ($rows & 0x1F and time() == $lasttime)
        {
                $interval = tv_interval($t1);
                $p->("\r", '');
        }
    }

    $interval = tv_interval($t1);
    $p->("\r", sprintf(" %.2f sec\n", $interval));

}

if ($dbmirror2) {
    $sql->auto_commit;
    $sql->do('TRUNCATE dbmirror2.pending_keys');
    $sql->auto_commit;
    $sql->do('TRUNCATE dbmirror2.pending_ts');
}

    $sql->finish;

print localtime() . " : Summary of changes applied:\n";
print "Inserts  Updates  Deletes  Table\n";
for my $t (sort keys %bytable)
{
    printf "%7d  %7d  %7d  %s\n", @{ $bytable{$t} }, $t;
}
printf "%7d  %7d  %7d  %s\n", $ins, $upd, $del, '(total)';

print localtime() . " : Replication changes applied\n";

{
    my @pending_tables;
    if ($dbmirror2) {
        push @pending_tables, 'dbmirror2.pending_data';
    } else {
        push @pending_tables, qw( dbmirror_pendingdata dbmirror_pending );
    }

    for my $table (@pending_tables)
    {
        print localtime() . " : Checking that $table is empty\n";

        my $n = $sql->select_single_value(
                "SELECT COUNT(*) FROM $table",
        );
        die "$table still contains data (rows=$n)!" if $n;

        print localtime() . " : Optimising $table\n";
        $sql->auto_commit;
        $sql->do("VACUUM ANALYZE $table");
    }
}

print localtime() . " : ProcessReplicationChanges complete\n";
exit;

sub mirrorCommand
{
    my ($op, $result, $applying_change_sql, $row, $transId, $pull)  = @_;

    my $table = $row->[1];
    $table =~ s/^"public"\.//;
    $row->[1] = $table;
    my $t = ($bytable{$table} ||= [0,0,0]);

    if ($op eq 'i')
    {
        ++$ins;
        ++$t->[0];
        return mirrorInsert($result, $applying_change_sql, $row, $transId);
    }
    elsif ($op eq 'd')
    {
        ++$del;
        ++$t->[2];
        return mirrorDelete($result, $applying_change_sql, $row, $transId);
    }
    elsif ($op eq 'u')
    {
        ++$upd;
        ++$t->[1];
        return mirrorUpdate($result, $applying_change_sql, $row, $transId, $pull);
    }
    return 0;
}

sub dbmirror2_command {
    my ($applying_change_sql, $row)  = @_;

    my ($seqid, $table, $op, $olddata, $newdata, $keys) = @{$row};

    if (defined $olddata) {
        $olddata = $json->decode($olddata);
    }

    if (defined $newdata) {
        $newdata = $json->decode($newdata);
    }

    # Some dbmirror2 packets have an incomplete pending_keys file due to a
    # (now fixed) bug. A few other packets are missing a pending_keys file
    # altogether. If we encounter either of these scenarios, fetch primary
    # keys from the database.
    if (!defined $keys) {
        state $fetched_keys = {};
        $keys = $fetched_keys->{$table};
        if (!defined $keys) {
            warn (
                "Primary keys for $table were not found in the " .
                "pending_keys file. Fetching them from the database."
            );
            my ($schema_part, $table_part) = @{ $applying_change_sql->select_single_value(
                'SELECT parse_ident(?)',
                $table,
            ) };
            $keys = [get_primary_keys($applying_change_sql, $schema_part, $table_part)];
            $fetched_keys->{$table} = $keys;
        }
    }

    my $t = ($bytable{$table} ||= [0, 0, 0]);

    if ($op eq 'i') {
        ++$ins;
        ++$t->[0];
        return dbmirror2_insert($applying_change_sql, $table, $newdata);
    } elsif ($op eq 'd') {
        ++$del;
        ++$t->[2];
        return dbmirror2_delete($applying_change_sql, $table, $olddata, $keys);
    } elsif ($op eq 'u') {
        ++$upd;
        ++$t->[1];
        return dbmirror2_update($applying_change_sql, $table, $olddata, $newdata, $keys);
    }
    return 0;
}

use MusicBrainz::Server::dbmirror;

sub mirrorInsert
{
    my ($result, $applying_change_sql, $row, $transId)  = @_;
    my $seqId = $row->[0];
    my $tableName = $row->[1];

    my $valuepairs = MusicBrainz::Server::dbmirror::unpack_data($row->[4], $seqId)
        or die;

    my ($statement, $args) = MusicBrainz::Server::dbmirror::prepare_insert($tableName, $valuepairs);

    if ($ignore_conflicts) {
        $statement .= ' ON CONFLICT DO NOTHING';
    }

    print localtime() . " : INSERT INTO $tableName\n" if $show_short_sql;
    show_long_sql($statement, $args) if $show_long_sql;
    $applying_change_sql->do($statement, @$args);

    return 1;
}

sub get_bind_value {
    my ($applying_change_sql, $table, $column, $value) = @_;

    return undef unless defined $value;

    my $type_name = $applying_change_sql->get_column_type_name($table, $column);

    # Most likely not needed, as we set `boolean_values` to (0, 1)
    # on the Perl JSON decoder.
    if ($type_name eq 'boolean') {
        return $value ? 1 : 0;
    }

    # TIMESTAMP WITH TIME ZONE, when cast to TEXT, omits the "T"
    # separator, while the JSON-encoded version (from the postgres JSON
    # encoder) does not. The JSON version also includes ":00" in the
    # offset.
    if ($type_name eq 'timestamp with time zone') {
        $value =~ s/T/ /;
        $value =~ s/:00$//;
        return $value;
    }

    if ($type_name eq 'json' || $type_name eq 'jsonb') {
        return $json->encode($value);
    }

    if ($type_name eq 'json[]' || $type_name eq 'jsonb[]') {
        my $encode_array;
        # Handle multi-dimensional arrays.
        $encode_array = sub {
            my $arr = shift;
            if (ref($arr) eq 'ARRAY') {
                return [map { $encode_array->($_) } @$arr];
            }
            return $json->encode($_);
        };
        return $encode_array->($value);
    }

    return $value;
}

sub get_sorted_columns_and_values {
    my ($applying_change_sql, $table, $data) = @_;

    return unless defined $data;

    my @columns = sort { $a cmp $b } keys %{$data};

    my @values = map {
        get_bind_value($applying_change_sql, $table, $_, $data->{$_})
    } @columns;

    return (\@columns, \@values);
}

sub valid_identifer {
    my $id = shift;

    # All MB table and column names should use only ASCII letters, numbers,
    # and underscores. We mainly check this so we don't have to quote
    # identifiers properly.
    die "Unsupported identifier: $id"
        unless $id =~ /^[A-Za-z_]+[A-Za-z0-9_]*$/a;

    return $id;
}

sub column_values_differ {
    my ($old_value, $new_value) = @_;

    return (
        (defined $old_value) ne (defined $new_value) ||
        (
            (defined $old_value) && (defined $new_value) &&
            $old_value ne $new_value
        )
    );
}

sub warn_if_existing_data_differs {
    my ($applying_change_sql, $table, $olddata, $keys, $conditions, $key_data) = @_;

    my $row = $applying_change_sql->select_single_row_hash(
        "SELECT * FROM $table WHERE $conditions",
        @$key_data,
    );

    my %key_map = zip @$keys, @$key_data;

    my $key_string = join q( ), map {
        $_ . '=' . $applying_change_sql->dbh->quote($key_map{$_})
    } @$keys;

    if (defined $row) {
        for my $col (keys %{$row}) {
            next if exists $key_map{$col};

            my $packet_value = get_bind_value($applying_change_sql, $table, $col, $olddata->{$col});
            my $actual_value = $row->{$col};

            my $type_name = $applying_change_sql->get_column_type_name($table, $col);
            # postgres has no equality operator for json, only jsonb
            $type_name =~ s/json(?!b)/jsonb/;
            my $comparison_query;
            if ($type_name eq 'point') {
                # Two points are considered equivalent if the distance
                # between them is 0.
                $comparison_query = "SELECT (?::point <-> ?::point) != 0";
            } else {
                $comparison_query =
                    "SELECT ?::$type_name IS DISTINCT FROM ?::$type_name";
            }
            if (
                $applying_change_sql->select_single_value(
                    $comparison_query,
                    $packet_value,
                    $actual_value,
                )
            ) {
                warn (
                    "The current row in $table with key $key_string " .
                    "contains a different value in column $col " .
                    "($actual_value) than the replication packet " .
                    "suggests it should have as the old value " .
                    "($packet_value)."
                );
            }
        }
    } else {
        my $is_non_empty = $applying_change_sql->select_single_value(
            "SELECT 1 FROM $table",
        );
        warn "No row found in $table with keys $key_string."
            if $is_non_empty;
    }
}

sub dbmirror2_insert {
    my ($applying_change_sql, $table, $newdata)  = @_;

    my ($columns, $values) = get_sorted_columns_and_values($applying_change_sql, $table, $newdata);
    my $columns_string = join q(, ), map { valid_identifer($_) } @$columns;
    my $placholders = join q(, ), (('?') x @$columns);
    my $statement = "INSERT INTO $table ($columns_string) VALUES ($placholders)";

    if ($ignore_conflicts) {
        $statement .= ' ON CONFLICT DO NOTHING';
    }

    print localtime() . " : INSERT INTO $table\n" if $show_short_sql;
    show_long_sql($statement, $values) if $show_long_sql;

    $applying_change_sql->do($statement, @$values);

    return 1;
}

sub mirrorDelete
{
    my ($result, $applying_change_sql, $row, $transId) = @_;
    my $seqId = $row->[0];
    my $tableName = $row->[1];

    my $keypairs = MusicBrainz::Server::dbmirror::unpack_data($row->[4], $seqId)
        or die;

    my ($statement, $args) = MusicBrainz::Server::dbmirror::prepare_delete($tableName, $keypairs);

    print localtime() . " : DELETE FROM $tableName\n" if $show_short_sql;
    show_long_sql($statement, $args) if $show_long_sql;
    $applying_change_sql->do($statement, @$args);

    return 1;
}

sub dbmirror2_delete {
    my ($applying_change_sql, $table, $olddata, $keys)  = @_;

    my $conditions = join ' AND ',
        map { valid_identifer($_) . ' = ?' }
        @$keys;
    my @key_data = map {
        get_bind_value($applying_change_sql, $table, $_, $olddata->{$_})
    } @$keys;

    warn_if_existing_data_differs(
        $applying_change_sql, $table, $olddata,
        $keys, $conditions, \@key_data,
    ) unless $ignore_conflicts;

    my $statement = "DELETE FROM $table WHERE $conditions";

    print localtime() . " : DELETE FROM $table\n" if $show_short_sql;
    show_long_sql($statement, \@key_data) if $show_long_sql;

    $applying_change_sql->do($statement, @key_data);

    return 1;
}

sub mirrorUpdate
{
    my ($result, $applying_change_sql, $row, $transId, $pull) = @_;
    my $seqId = $row->[0];
    my $tableName = $row->[1];

    my $keypairs = MusicBrainz::Server::dbmirror::unpack_data($row->[4], $seqId)
        or die;

    my @row2 = $result->next_row;
    if (!@row2) {
        unless ($pull->() and @row2 = $result->next_row) {
            die 'Unable to find required data to perform UPDATE';
        }
    }
    my $valuepairs = MusicBrainz::Server::dbmirror::unpack_data($row2[4], $seqId);

    my ($statement, $args) = MusicBrainz::Server::dbmirror::prepare_update($tableName, $valuepairs, $keypairs);

    print localtime() . " : UPDATE $tableName\n" if $show_short_sql;
    show_long_sql($statement, $args) if $show_long_sql;
    $applying_change_sql->do($statement, @$args);

    return 1;
}

sub dbmirror2_update {
    my ($applying_change_sql, $table, $olddata, $newdata, $keys)  = @_;

    my $conditions = join ' AND ',
        map { valid_identifer($_) . ' = ?' }
        @$keys;
    my @key_data = map {
        get_bind_value($applying_change_sql, $table, $_, $olddata->{$_})
    } @$keys;

    warn_if_existing_data_differs(
        $applying_change_sql, $table, $olddata,
        $keys, $conditions, \@key_data,
    ) unless $ignore_conflicts;

    my (@changed_columns, @changed_values);

    for my $column (sort keys %{$newdata}) {
        my $old_value = get_bind_value($applying_change_sql, $table, $column, $olddata->{$column});
        my $new_value = get_bind_value($applying_change_sql, $table, $column, $newdata->{$column});
        if (column_values_differ($old_value, $new_value)) {
            push @changed_columns, $column;
            push @changed_values, $new_value;
        }
    }

    return 1 unless @changed_columns;

    my $updates = join q(, ),
        map { valid_identifer($_) . ' = ?' }
        @changed_columns;

    my $statement = "UPDATE $table SET $updates WHERE $conditions";

    print localtime() . " : UPDATE $table\n" if $show_short_sql;
    show_long_sql($statement, \@key_data) if $show_long_sql;

    $applying_change_sql->do($statement, @changed_values, @key_data);

    return 1;
}

sub show_long_sql
{
    my ($statement, $args) = @_;
    printf "%s : %s (%s)\n",
        scalar(localtime),
        $statement,
        join(' ', map { defined() ? $_ : 'NULL' } @$args),
        ;
}

1;

#############################################################################
#
# Portions (C) 2003 Robert Kaye
#
# Formerly DBMirror.pl
# Contains the Database mirroring script.
# This script queries the dbmirror_pending table off the database specified
# (along with the associated schema) for updates that are pending on a
# specific host.  The database on that host is then updated with the changes.
#
#
#     Written by Steven Singer (ssinger@navtechinc.com)
#     (c) 2001-2002 Navtech Systems Support Inc.
# ALL RIGHTS RESERVED;
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose, without fee, and without a written agreement
# is hereby granted, provided that the above copyright notice and this
# paragraph and the following two paragraphs appear in all copies.
#
# IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
# DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
# LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
# DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
# ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
# PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
#
##############################################################################
